[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner#37012
[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner#37012stankiewicz wants to merge 1 commit intoapache:masterfrom
Conversation
|
For stack delta, you look at relevant commit - ontimer |
Summary of ChangesHello @stankiewicz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Apache Beam Java SDK's timer mechanism by introducing a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
0dbde3d to
69b077f
Compare
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Run Java PreCommit |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
69b077f to
9ff470a
Compare
2d77410 to
2612296
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #37012 +/- ##
=========================================
Coverage 40.06% 40.06%
Complexity 3404 3404
=========================================
Files 1177 1177
Lines 187083 187083
Branches 3581 3581
=========================================
+ Hits 74947 74953 +6
+ Misses 108744 108739 -5
+ Partials 3392 3391 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2612296 to
bf072d2
Compare
runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
Outdated
Show resolved
Hide resolved
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Show resolved
Hide resolved
bf072d2 to
602243b
Compare
| .setPaneInfo(currentTimer.getPaneInfo()) | ||
| .setCausedByDrain(causedByDrain) | ||
| .setReceiver( | ||
| windowedValue -> |
There was a problem hiding this comment.
@kennknowles , ptal.
causedBy drain will be part of windowedValue but because receiver is lambda from WV to context.outputWindowedValue, I'm pretty sure I'm loosing all WV metadata.
| .setReceiver( | ||
| windowedValue -> { | ||
| checkTimerTimestamp(windowedValue.getTimestamp()); | ||
| outputTo(mainOutputConsumer, windowedValue); |
There was a problem hiding this comment.
@kennknowles here, windowedValue is sent differently, why is that?
There was a problem hiding this comment.
I am not sure. This file was a huge mess, with actually many types of transforms executed in the same file via switch statements. It may just be an accidental piece of history and/or a part that I failed to refactor when I introduced OutputBuilder. My quick read is that this one is the idea way to do it, because it passes the whole WindowedValue to the FnDataReceiver so it doesn't lose dasta. In the other places where it goes through the OnTimerContext it loses metadata.
| Instant timestamp, | ||
| Collection<? extends BoundedWindow> windows, | ||
| PaneInfo paneInfo) { | ||
| builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); |
There was a problem hiding this comment.
@kennknowles , I assume that by introducing builder we freeze outputWindowedValue and have some limited functionality, but as part of receiver we should use outputWindowedValue method as we risk losing any new fields we've added.
There was a problem hiding this comment.
I am not sure if I understand the comment. Here is what I think are answers:
- Yes, we freeze the API for
outputWindowedValue. - No, we shouldn't lose fields, because
builder(value)is an abstract method and it is the job ofOutputReceiver.builder(...)to make sure to set all the builder values to defaults, and propagate the values from the current element or timer context.
c68df92 to
43050a8
Compare
…ions. Mostly passthrough.
43050a8 to
03aa66f
Compare
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
|
||
| @Override | ||
| public CausedByDrain causedByDrain() { | ||
| return CausedByDrain.NORMAL; |
There was a problem hiding this comment.
Should this come from the element metadata?
|
|
||
| @Override | ||
| public CausedByDrain causedByDrain() { | ||
| return CausedByDrain.NORMAL; |
There was a problem hiding this comment.
Should this come from the element metadata?
| timestamp, | ||
| outputTimestamp, | ||
| timeDomain, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
I think it should propagate here? will that be a follow-up that adds it to the pushback side input DoFnRunner?
| new Instant(0), | ||
| TimeDomain.EVENT_TIME); | ||
| TimeDomain.EVENT_TIME, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
Maybe this unit test file should test caused by drain propagation somehow?
| new Instant(((Number) kvMap.get("holdTimestamp")).longValue()), | ||
| paneInfo); | ||
| paneInfo, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
I think this should actually come from the data? It is a weird method and I have no context for why it is the way it is...
| Instant timestamp, | ||
| Collection<? extends BoundedWindow> windows, | ||
| PaneInfo paneInfo) { | ||
| builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); |
There was a problem hiding this comment.
I am not sure if I understand the comment. Here is what I think are answers:
- Yes, we freeze the API for
outputWindowedValue. - No, we shouldn't lose fields, because
builder(value)is an abstract method and it is the job ofOutputReceiver.builder(...)to make sure to set all the builder values to defaults, and propagate the values from the current element or timer context.
| HOLD_TIME, | ||
| PaneInfo.NO_FIRING)); | ||
| PaneInfo.NO_FIRING, | ||
| CausedByDrain.NORMAL)); |
There was a problem hiding this comment.
We need some caused by drain tests?
|
Reminder, please take a look at this pr: @chamikaramj |
This pull request enhances the Apache Beam Java SDK's timer mechanism by propagating a causedByDrain flag. This flag allows DoFns to distinguish between timers that fire as part of normal pipeline execution and those that are explicitly triggered during a pipeline draining process. By propagating this information through the core timer interfaces and their implementations, it provides more granular control and context for DoFns reacting to timer events e.g. in future within SDF or ReduceFnRunner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.